Aller au contenu principal

Python Basic Runtime

Introduction

The Python Basic Runtime allows you to develop functions in Python that can then be used in a KFlow Workflow. These functions are encapsulated in an image which is then run in a Pod by the KFlow Runtime, either in batch mode or stream mode, depending on the configuration of the workflow.

In order to use the Python Basic Runtime you need to :

  1. Get the Python dependency for IDEs.
  2. Derive the ProcessFunction class.
  3. Assign it to a BasicRuntime object & call the BasicRuntime#run method.
  4. Create a Docker image which runs your main Python file.
  5. Use the image in a KFlow Workflow.

A demonstration python project is provided with Kflow, and excerpts from it will be used thereafter.

I - Get the Python dependency for IDEs

The following has to be done only once. The first step is not necessary with the demo python project, because kfbasicpy.tar.gz is already provided in dist folder.

# extract the dependencies from the runtime image
cat > Dockerfile.kf-basicpy-dist <<EOF
FROM kflow-basic-python-runtime:dev AS runtime
FROM scratch AS dependencies
COPY --from=runtime /dist/kfbasicpy.tar.gz .
EOF

docker buildx build --output=dist --target=dependencies . -f Dockerfile.kf-basicpy-dist

rm Dockerfile.kf-basicpy-dist

# make APIs visible to IDEs

## install essential
pip install dist/kfbasicpy.tar.gz

## install with ner dependencies
pip install "dist/kfbasicpy.tar.gz[ner]"

## install with ner and extract dependencies
pip install "dist/kfbasicpy.tar.gz[ner,extract]"

II - Derive the class ProcessFunction

In order to create a ProcessFunction you need to derive the class with the same name. When doing so you'll have a few methods to implements :

  • open(self, OpenContext) : This method will be called once when the application is starting, use it to create client for example.
    • The OpenContext has three functions :
      • get_meta, returns the configuration provided to your task in the key meta in the Pipeline.
      • upload(self: Self, file_name: str, bucket: str, key: str, **kwargs: str) -> bool uploads a document to S3 (including user metadatas if needed).
      • download(self: Self, bucket: str, key: str) -> DownloadResult downloads a file from S3 (including user metadatas if any exists) and returns its local path and metadata.
  • process(self, event_dict: dict[str, Any], ctx: Context) -> dict[str, Any] | list[dict[str, Any]] : This method is the processing method, that will be applied to each data received.
    • The event_dict is the dictionary corresponding to input data
    • The Context allows you to get :
      • The local path of the file that was auto downloaded via get_downloader_local_object_url.
      • The user Metadatas of the file that was auto downloaded via get_property.
      • A method to request manual acknowledgement of inputs and an associted method flush(self, *paths:str) to delete the local files provided.
    • If you want to send data to an output, just return your data as an Array of dictionaries or a single dictionary. The keys should be those of the specified output schema in the worflow configuration.
  • close() : This method will be called when the application is stopping, use it to close any connection or client.

The demo project contains a ProcessCSVFunction class that reads each CSV file provided as input (autoDownload must be set to true in the globalConfig of the task using this function. See autoDownload documentation) and outputs only the first two columns, renamed "a" and "b".

import logging
from typing import Any
from kfbasicpy.function.context import Context
from kfbasicpy.function.function import ProcessFunction
from kfbasicpy.function.open_context import OpenContext
from kfbasicpy.runtime.basic import BasicRuntime
import csv


logger = logging.getLogger(__name__)


class ProcessCSVFunction(ProcessFunction):

def open(self, ctx: OpenContext) -> None:
"""called once on startup"""
logger.info("opened")

def process(self, event_dict: dict[str, Any], ctx: Context) -> dict[str, Any] | list[dict[str, Any]]:
"""process an input event
- The received dict abides to the schema defined in the previous task
- The output dict should abides to the schema defined in either the
- process function task
- or the next function receiving this process function result as input.
in this example, the dict expects to contain as output two fields named
url(type=string) and size(type=long)
"""
logger.info(f"event = {event_dict}, context = {ctx}")
fileToProcessLocalPath = ctx.get_downloader_local_object_url()

eventList: list[dict[str, Any]] = list()
with open(fileToProcessLocalPath, newline='') as csvFile:
reader = csv.reader(csvFile, delimiter=',')
for row in reader:
eventList.append({"a": row[0], "b": row[1]}) # obviously, some error handling should be added here

return eventList

Use the following Avro schema in your specification:

{
"namespace": "myns",
"type": "record",
"name": "myname",
"fields": [
{"name": "a", "type": "string"},
{"name": "b", "type": "string"}
]
}

III - Assign your function to a runtime

This example shows how to set the logging level with an environment varaiable. You could also use an environment variable to select the function to run.

from kfbasicpy.runtime.basic import BasicRuntime
from kfbasicpy.function.forward_function import ForwardProcessFunction
from kfbasicpy.function.print_function import PrintProcessFunction
from kfbasicpy.function.txtextract_function import TxtExtractFunction
from os import getenv

from function.process_csv import ProcessCSVFunction

if __name__ == "__main__":
import logging
import sys

logging.basicConfig(
format='[%(name)s]: (%(asctime)s) => %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
level=logging.getLevelName(getenv("KFLOW_RUNTIME_LOG_LEVEL", "INFO")),
stream=sys.stdout
)

runtime = BasicRuntime()
runtime.add_process_function(ProcessCSVFunction())
runtime.run()

Notice that you can add multiple ProcessFunction. Each will be called with the same input and their results will be concatenated in an unspecified order.

IV - Create a Docker image

You need to create a docker image that will run your Python.

FROM kflow-basic-python-runtime:dev

# to install ner,extract additional dependencies
RUN pip install "kfbasicpy.tar.gz[ner,extract]"

# install your dependencies
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# include your python code
COPY src src

ENV KFLOW_RUNTIME_LOG_LEVEL=INFO

# Add additional environment variables if needed, for instance to select the default function to run
ENV =forward

ENTRYPOINT ["python", "-u" "src/main.py"]

Then build the image and send it to minikube registry :

docker image build -t myregistry.example.com/project/process-function-python:1.0 .
docker image push myregistry.example.com/project/process-function-python:1.0 .

V - Use the image in a KFlow Workflow

The following workflow reads CSV files in docs bucket, processes them with the above ProcessCSVFunction and writes the result in the testjson bucket. Notice the image used in the demo task.

parallelism: 1
globalConfig:
store:
autoDownload: false
addr: minio.kosmos.svc.cluster.local:9000
user: minioadmin
password: minioadmin

mode: stream
dag:
- id: list-s3-bucket
type: fs-source
format: list
addr: minio.kosmos.svc.cluster.local:9000
user: minioadmin
password: minioadmin
topics: s3a://docs
meta:
checkpoint: true
schema: |
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
out:
- demo
- id: demo
type: raw
image: myregistry.example.com/project/process-function-python:1.0
imagePullPolicy: IfNotPresent
globalConfig:
store:
autoDownload: true # should not be set globally else s3-sink will expect an input schema like the output schema of list-s3-bucket
schema: |
{
"type": "record",
"name": "necessaryButNotUsed",
"namespace": "alsoNecessaryButNotUsed",
"fields" : [
{"name": "a", "type": "string"},
{"name": "b", "type": "string"}
]
}
meta:
container:
env:
- name: KFLOW_RUNTIME_LOG_LEVEL
value: DEBUG
- name: PYTHONUNBUFFERED
value: "1"
out:
- s3
- id: s3
type: s3-sink
image: myregistry.example.com/basicjavaruntime:latest
imagePullPolicy: Never
addr: minio.kosmos.svc.cluster.local:9000
user: minioadmin
password: minioadmin
# bucket to write into
topics: s3a://testjson
meta:
bulk.enabled: true
bulk.flush.timer: 1000
bulk.flush.size: 1000
container:
env:
- name: KFLOW_RUNTIME_LOG_LEVEL
value: TRACE
schema: |
{
"type": "record",
"name": "necessaryButNotUsed",
"namespace": "alsoNecessaryButNotUsed",
"fields" : [
{"name": "a", "type": "string"},
{"name": "b", "type": "string"}
]
}

Advanced Configurations

Python inlining

For simple computations, the default Runtime allows you to inline the process function in the topology. Here is an example :

  - id: name-of-function
type: raw
image: hosted-registry.technique.artemis/basicpythonruntime:1.5.1
imagePullPolicy: IfNotPresent
schema: |
{
"type": "record",
"name": "fakevalidator",
"namespace": "tech.athea",
"fields": [
{ "name": "rasterPath", "type": "string" },
{ "name": "detector", "type": "string" },
{ "name": "geoFence", "type": "string" },
{ "name": "traceId", "type": "string" },
{ "name": "additionalDataPath", "type": "string" },
{ "name": "acquisitionTimeStamp", "type": "long" },
{ "name": "ingestionTimeStamp", "type": "long" },
{ "name": "processingStartTimeStamp", "type": "long" }
]
}
meta:
name: process
code: |-
from kfbasicpy.function.context import Context
def process(ctx: Context, event: dict[str, Any]) -> dict[str, Any]:
from uuid import uuid4
from datetime import datetime, timedelta
record = {
"rasterPath": f"{event['key']}",
"detector": "aircraft",
"geoFence": "",
"traceId": f"{uuid4()}",
"additionalDataPath": "",
"acquisitionTimeStamp": int((datetime.now() - timedelta(days=10)).timestamp() * 1000),
"ingestionTimeStamp": int((datetime.now() - timedelta(hours=2)).timestamp() * 1000),
"processingStartTimeStamp": int((datetime.now() - timedelta(minutes=30)).timestamp() * 1000),
}
print(record)
return record
out:
- name-of-output-function

Note that meta is using the attribute code and that the attribute name is the name of the function to call. Unless any good reason exists, it is recommended to name the function process.

GPU configuration

> configure nvidia on host
https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html

> troubleshoot

Failed to initialize NVML: Driver/library version mismatch
NVML library version: 535.183

==> this is due to host system auto updating libcu.so generating a version conflict at OS-level. Rebooting the host system fix the issue.
==> to avoid this on happening, disable auto updates on nvidia-drivers then reboot system

export VERSION=535
sudo apt-mark hold nvidia-dkms-${VERSION}
sudo apt-mark hold nvidia-driver-${VERSION}
sudo apt-mark hold nvidia-utils-${VERSION}

In your Dockerfile, replace the FROM line with the following:

FROM kflow-basic-python-runtime.gpu:dev